package com.example.demo.mqserver.datacenter;

/**
 * Created with IntelliJ IDEA.
 * Description
 * User: 杰
 * Date: 2023 -08 -01
 * Time: 17:36
 */

import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.MSGQueue;
import com.example.demo.mqserver.core.Message;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 使用这个类来统一管理内存中的所有数据.
 * 因为要提供给多个调用者来使用, 因此要考虑线程安全的情况.
 */
public class MemoryDataCenter {
    // key 是 exchangeName, value 是 Exchange 对象.
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    // key 是 queueName, value 是 MSGQueue 对象.
    private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
    // 第一个 key 是 exchangeName, 第二个 key 是 queueName.
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
    // key 是 messageId, value 是 Message 对象.
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
    // key 是 queueName, value 是一个 Message 的链表.
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
    // 第一个 key 是 queueName, 第二个 key 是 messageId
    private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

    public void insertExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(),exchange);
        System.out.println("[MemoryDataCenter] 新交换机添加完成! exchangeName=" + exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);
    }

    public void insertQueue(MSGQueue queue) {
        queueMap.put(queue.getName(),queue);
        System.out.println("[MemoryDataCenter] 新队列添加完成! QueueName=" + queue.getName());
    }

    public MSGQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功! QueueName=" + queueName);
    }

    public void insertBinding(Binding binding) throws MqException {
        // 先使用 exchangeName 查一下, 对应的哈希表是否存在, 不存在则创建.
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(),bindingMap);
//        }
        // 使用下面这一句就可以替换上面的代码
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());
        synchronized (bindingMap) {
            // 在根据 queueName 查一下, 如果已经存在, 就抛出异常, 不存在才能插入.
            if (bindingMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName= " + binding.getExchangeName() + ", queueName=" + binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(),binding);
        }
        System.out.println("[MemoryDataCenter] 新绑定添加完成! exchangeName " + binding.getExchangeName() + ", QueueName=" + binding.getQueueName());
    }

    // 获取绑定 两个版本
    // 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding
    // 2. 根据 exchangeName 获取到所有的 Binding
    public Binding getBinding(String exchangeName, String queueName) {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null) {
            return null;
        }
        return bindingMap.get(queueName);
    }

    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
        return bindingsMap.get(exchangeName);
    }

    // 删除绑定
    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if (bindingsMap == null) {
            // 代码到这里表示这个交换机没有绑定任何队列.
            throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName=" + binding.getExchangeName() +
                    ",queueName=" + binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除完成! exchangeName " + binding.getExchangeName() + ", QueueName=" + binding.getQueueName());
    }

    // 添加消息
    public void addMessage(Message message) {
        messageMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 新消息添加完成! messageId=" + message.getMessageId());
    }

    // 根据 id 查询消息
    public Message getMessage(String messageId) {
        return messageMap.get(messageId);
    }

    // 根据 id 删除消息
    public void removeMessage(String messageId) {
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息删除成功! messageId=" + messageId);
    }

    // 发送消息到指定队列
    public void sendMessage(MSGQueue queue, Message message) {
        // 把消息放到对应的队列数据结构中.
        // 先根据队列的名字, 找到该队列对应的消息链表.
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k -> new LinkedList<>());
        synchronized (messages) {
            // 把数据加到 messages 里面
            messages.add(message);
        }
        // 把该消息也往消息中心插入一下. (如果消息已经存在, 重复插入也没事)
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息被投递到队列中 messageId=" + message.getMessageId());
    }
    // 从队列中取消息
    public Message pollMessage(String queueName) {
        // 根据队列名, 查找对应的对应的消息链表
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if (messages == null) {
            return null;
        }
        synchronized (messages) {
            if (messages.size() == 0) {
                return null;
            }
            // 链表中有元素就进行头删.
            Message currentMessage = messages.remove(0);
            System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId+" + currentMessage.getMessageId());
            return currentMessage;
        }
    }
    // 获取指定队列中消息的个数
    public int getMessageCount(String queueName) {
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if (messages == null) {
            // 队列中没有消息
            return 0;
        }
        synchronized (messages) {
            return messages.size();
        }
    }

    // 添加未确认的消息
    public void addMessageWaitAck(String queueName, Message message) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
                k -> new ConcurrentHashMap<>());
        messageHashMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId=" + message.getMessageId());
    }

    // 删除未确认的消息
    public void removeMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if (messageHashMap == null) {
            System.out.println("[MemoryDataCenter] 消息确认队列为空! 不存在该消息: messageId=" + messageId);
            return;
        }
        messageHashMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId=" + messageId);
    }
    // 获取指定的未确认的消息
    public Message getMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if (messageHashMap == null) {
            return null;
        }
        return messageHashMap.get(messageId);
    }

    // 该方法是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.
    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        // 1. 清空之前的所有的数据
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        // 2. 恢复所有的交换机数据
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for (Exchange exchange: exchanges) {
            exchangeMap.put(exchange.getName(),exchange);
        }
        // 3. 恢复所有的队列数据
        List<MSGQueue> queues = diskDataCenter.selectAllQueues();
        for (MSGQueue queue: queues) {
            queueMap.put(queue.getName(),queue);
        }
        // 4. 恢复所有的绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for (Binding binding : bindings) {
            ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k ->
                    new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(),binding);
        }
        // 5. 恢复所有的消息数据
        // 遍历所有的队列, 根据每个队列的名字 获取到所有的消息.
        for (MSGQueue queue : queues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
            queueMessageMap.put(queue.getName(),messages);
            for (Message message : messages) {
                messageMap.put(message.getMessageId(),message);
            }
        }
        // 对于 "未确认的消息" 这部分内存中的数据, 则不需要从硬盘恢复.
        // 如果用户取走了消息, 服务器在等待ack, 服务器重启了, 这时这个 "未被确认的消息", 就恢复成 "未被取走的消息".
    }
}








































